package logger

import (
	"encoding/json"
	"fmt"
	"gitee.com/zhucheer/orange/cfg"
	"gitee.com/zhucheer/orange/internal"
	"github.com/Shopify/sarama"
	"time"
)

type kafkaOption struct {
	appName   string
	endpoints []string
	topic     string
	password  string
	username  string
	version   string
}

type kafkaLogger struct {
	option   *kafkaOption
	producer sarama.AsyncProducer
}

var kafkaIns *kafkaLogger

// 将结构化日志信息同步到kafka
func initKafkaHandler() {
	option := getKafkaOption()
	if option == nil {
		return
	}
	go registerKafkaProducer(option)
	internal.ConsoleLog(fmt.Sprintf("logger kafka push start topic:[%s]", option.topic))
}

// 推送日志到kafka
func sendKafkaLog(record *Record) {
	if kafkaIns == nil || kafkaIns.producer == nil {
		return
	}
	logList := make(map[string]interface{})
	logList = map[string]interface{}{
		"level":    record.LevelString,
		"message":  record.Message,
		"timte":    record.Time,
		"funcName": record.FuncName,
		"file":     record.FileName,
		"app":      kafkaIns.option.appName,
		"line":     fmt.Sprintf("%d", record.Line),
	}
	for _, item := range record.KvField {
		logList[item.Key] = fmt.Sprintf("%v", item.Value)
	}

	recordJson, _ := json.Marshal(logList)
	msg := &sarama.ProducerMessage{
		Topic:     kafkaIns.option.topic,
		Value:     sarama.ByteEncoder(recordJson),
		Timestamp: time.Now(),
	}
	kafkaIns.producer.Input() <- msg
	return
}

// 获取kafka配置
func getKafkaOption() (option *kafkaOption) {
	if cfg.Config == nil || cfg.Config.Exists("app.logger.kafka") == false {
		return
	}
	appName := cfg.GetString("app.name", "orange")
	option = &kafkaOption{
		endpoints: cfg.GetSliceString("app.logger.kafka.endpoints", []string{}),
		topic:     cfg.GetString("app.logger.kafka.topic", ""),
		username:  cfg.GetString("app.logger.kafka.username", ""),
		password:  cfg.GetString("app.logger.kafka.password", ""),
		version:   cfg.GetString("app.logger.kafka.version", "2.0.0"),
		appName:   appName,
	}
	if option.topic == "" {
		option.topic = appName
	}

	if len(option.endpoints) == 0 || option.topic == "" {
		return nil
	}

	return
}

// registerKafkaProducer 注册同步类型实例
func registerKafkaProducer(connOpt *kafkaOption) {
	kafkaIns = &kafkaLogger{
		option: connOpt,
	}
	kfkVersion, _ := sarama.ParseKafkaVersion(connOpt.version)
	if validateVersion(kfkVersion) == false {
		kfkVersion = sarama.V2_4_0_0
	}

	brokers := connOpt.endpoints
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.NoResponse
	// 随机向partition发送消息
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
	config.Producer.Return.Successes = false
	config.Producer.Return.Errors = false
	config.Producer.Compression = sarama.CompressionNone
	config.ClientID = connOpt.appName

	config.Version = kfkVersion
	if connOpt.username != "" {
		config.Net.SASL.Enable = true
		config.Net.SASL.User = connOpt.username
		config.Net.SASL.Password = connOpt.password
	}

	var err error
	kafkaIns.producer, err = sarama.NewAsyncProducer(brokers, config)
	if err != nil {
		panic(err)
	}
}

func CloseProducer() {
	if kafkaIns == nil || kafkaIns.producer == nil {
		return
	}
	kafkaIns.producer.Close()
}

// validateVersion 验证版本是否有效
func validateVersion(version sarama.KafkaVersion) bool {
	for _, item := range sarama.SupportedVersions {
		if version.String() == item.String() {
			return true
		}
	}
	return false
}
